package com.lee.service;

import com.lee.constants.RabbitConstants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.UUID;

@Service
public class OrderProducerService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate ;


    @PostConstruct
    public void setCallback() throws InterruptedException {
        // 设置返回回调
        rabbitTemplate.setReturnCallback(this);
        // 设置确认回调
        rabbitTemplate.setConfirmCallback(this);
        // 模拟消息发送
//        Runnable runnable = new Runnable() {
//            public void run() {
//                send("这是我发送的测试消息，测试id="+ UUID.randomUUID().toString());
//            }
//        };
//        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
//        scheduledExecutorService.scheduleAtFixedRate(runnable,20,5, TimeUnit.SECONDS);
        for (int x=0;x<500;x++) {
            Thread.sleep(100);
            send("这是我发送的测试消息，测试id="+ UUID.randomUUID().toString());
        }
    }

    private void send(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConstants.ORDER_EXCHANGE, RabbitConstants.ORDER_ROUTE_KEY, message, correlationData);
    }

    /**
     * 回调确认消息是否发送成功  confirm机制只保证消息到达交换机，不保证消息可以路由到正确的queue，如果交换机错误，就会触发confirm机制
     * @param correlationData
     * @param ack
     * @param s
     */
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        System.out.println("消息发送成功,发送ack确认,id="+correlationData.getId());
        if (ack){
            System.out.println("发送成功");
        }else {
            System.out.println("发送失败");
        }
    }

    /**
     *  Return 消息机制用于处理一个不可路由的消息。在某些情况下，如果我们在发送消息的是否，当前的 exchange 不存在或者指定路由 key 路由找不到，
     *      *  这个时候需要使用 Return 来监听这种不可达的消息
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息丢失, 没有投递成功");
    }

    // 监听死信队列
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitConstants.DEAD_QUEUE),exchange = @Exchange(value = RabbitConstants.DEAD_EXCHANGE),
            key = RabbitConstants.DEAD_ROUTE_KEY))
    public void deadQueueListener(Message message, Channel channel) throws InterruptedException, IOException {
        System.out.println("死信队列");
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        String msg = new String(message.getBody());
        System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
        //Thread.sleep(5000);
        // 发送ack给消息队列，收到消息了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

    }

    // 监听订单队列
    @RabbitListener(queues = RabbitConstants.ORDER_QUEUE)
    public void orderQueueListener(Message message, Channel channel) throws IOException, InterruptedException {
        System.out.println("正常队列");
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        String msg = new String(message.getBody());
        System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
        // 休眠 当队列消息ttl达到5000时，交由死信队列
        Thread.sleep(1000);
        //发送ack给消息队列,收到消息了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
